Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent estimation of task duration between stealing, adaptive and occupancy calculation #9000

Merged

Conversation

hendrikmakait
Copy link
Member

We've noticed that stealing could ping pong between two workers if tasks with long execution durations but no average duration were in processing. This PR fixes that.

  • Tests added / passed
  • Passes pre-commit run --all-files

Copy link
Contributor

github-actions bot commented Feb 3, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ± 0      27 suites  ±0   11h 33m 27s ⏱️ + 2m 11s
 4 117 tests + 1   4 000 ✅ ± 0    111 💤 ±0  6 ❌ +1 
51 628 runs  +13  49 324 ✅ +14  2 296 💤 ±0  8 ❌  - 1 

For more details on these failures, see this check.

Results for commit 2bd934e. ± Comparison against base commit 5589049.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
distributed.tests.test_scheduler ‑ test_get_task_duration
distributed.tests.test_scheduler ‑ test_get_prefix_duration
distributed.tests.test_steal ‑ test_do_not_ping_pong

♻️ This comment has been updated with latest results.

@hendrikmakait hendrikmakait changed the title Consistent estimation of task duration between stealing and occupancy calculation Consistent estimation of task duration between stealing, adaptive and occupancy calculation Feb 3, 2025
@@ -2536,13 +2545,6 @@ def _transition_processing_memory(
action=startstop["action"],
)

s = self.unknown_durations.pop(ts.prefix.name, set())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this into the stealing plugin.

@@ -1931,22 +1925,37 @@ def total_occupancy(self) -> float:
self._network_occ_global,
)

def _get_prefix_duration(self, prefix: TaskPrefix) -> float:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the single source of truth for the duration estimation.

@@ -1674,9 +1674,6 @@ class SchedulerState:
#: Subset of tasks that exist in memory on more than one worker
replicated_tasks: set[TaskState]

#: Tasks with unknown duration, grouped by prefix
#: {task prefix: {ts, ts, ...}}
unknown_durations: dict[str, set[TaskState]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved into stealing.

@@ -236,8 +236,6 @@ class TaskState:
#: The next state of the task. It is not None iff :attr:`state` == resumed.
next: Literal["fetch", "waiting", None] = None

#: Expected duration of the task
duration: float | None = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used anywhere.

queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average
queued_occupancy += self._get_prefix_duration(ts.prefix)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a test for this, but the old version was definitely inconsistent.

Copy link
Collaborator

@phofl phofl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small comments, but looks good to me


prefix = ts.prefix
duration = self.scheduler._get_prefix_duration(prefix)
if cost_multiplier is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we start with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted.

return

s = self.unknown_durations.get(prefix.name)
if s is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this equivalent to if s not in self.unknown_durations? If yes, that would be easier to read in the future

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could be rewritten, I've just moved that bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted.

@hendrikmakait hendrikmakait merged commit b86b714 into dask:main Feb 4, 2025
28 of 32 checks passed
@hendrikmakait hendrikmakait deleted the fix-stealing-task-duratino-estimate branch February 4, 2025 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants